Skip to content

[FLINK]Support processing time tumble window for nexmark q12#13

Open
KevinyhZou wants to merge 19 commits intobigo-sg:gluten-0530from
KevinyhZou:support_proc_time_window
Open

[FLINK]Support processing time tumble window for nexmark q12#13
KevinyhZou wants to merge 19 commits intobigo-sg:gluten-0530from
KevinyhZou:support_proc_time_window

Conversation

@KevinyhZou
Copy link

@KevinyhZou KevinyhZou commented Oct 15, 2025

  1. support window_start, window_end function
  2. support processing timer service;
  3. support streaming aggreagte by use velox StreamingAggreagate

@KevinyhZou KevinyhZou changed the title [FLINK]Support processing time tumble window [FLINK]Support processing time tumble window for nexmark q12 Oct 15, 2025
@xinghuayu007 xinghuayu007 mentioned this pull request Oct 28, 2025
// Look for the end of the last group.
vector_size_t index = 0;
if (prevInput_) {
if (prevInput_ && numGroups_ > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为何要加这个条件?

RowVectorPtr output;

if (numGroups_ > minOutputBatchSize_) {
if (numGroups_ >= minOutputBatchSize_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里有必要改?

Copy link
Author

@KevinyhZou KevinyhZou Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对,velox 里面minOutputBatchSize 最小只能取1, numGroups > minOutputBatchSize 意味着当只有一条数据的时候,是不输出的,这不符合流计算的场景。

}
}

long TimeWindowUtil::getCurrentProcessingTime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64_t instead of long


static long cleanupTime(long maxTimestamp, long allowedLateness_, bool isEventTime);

static long getCurrentProcessingTime();
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static int64_t getCurrentProcessingTime(); since sizeof(long) may 4 bytes on 32 bit machine.

now.time_since_epoch()).count();
return {{timestamp_ms, input}};
long timestamp_ms = TimeWindowUtil::getCurrentProcessingTime();
if (windowType_ == 1) { // tumble window
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enum of windowType

ProcessingTimeCallback callback_;
};

class ProcessingTimeSerivice {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serivice -> Service typo.

return TimeWindowUtil::getCurrentProcessingTime();
}
virtual std::optional<std::string> registerTimer(long timestamp, ProcessingTimerTask target) {
std::optional<std::string> task;
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用纯虚函数,virtual std::optional<std::string> registerTimer(long timestamp, ProcessingTimerTask target) = 0;

virtual void cancel(const std::string& task) {}
virtual void close() {}

void finish(const std::string& task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish 不太准确,应该是unregister吧

return "proc_time_task_" + std::to_string(timestamp);
}
protected:
std::vector<std::string> registry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用std::set是不是更好? vector不适合频繁查找和删除。


namespace facebook::velox::stateful {

using ProcessingTimeCallback = std::function<void(long)>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long统一都改成int64_t吧 避免32位系统的溢出

}

std::string generateTimerTaskName(long timestamp) {
return "proc_time_task_" + std::to_string(timestamp);
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果在统一ms注册 这个可能冲突?加个counter

static std::atomic<uint64_t> counter{0};
return "proc_time_task_" + std::to_string(timestamp) + "_" + std::to_string(counter++);

void close() override {
if (executor_) {
executor_->shutdown();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registry.clear(); 加个清理注册的名字


/// This class is relevent to flink KeySelector.
/// It can partition the RowVector according to the key fields.
class KeySelector {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeySelector 应改是KeyPartitioner?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的, 按照key 分区。

@@ -31,7 +31,7 @@
memory::MemoryPool* pool,
int numPartitions = INT_MAX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认值不合理,设为1024吧

}

std::map<uint32_t, RowVectorPtr> KeySelector::partition(const RowVectorPtr& input) {
std::map<uint64_t, RowVectorPtr> KeySelector::partition(const RowVectorPtr& input) {
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口有点重,不应该用map 不过跟你这次改动没关系了 后面再改吧

}
}

void KeySelector::allocateIndexBuffers(
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口实现重,这样更好

struct IndexBuffers {
  std::vector<BufferPtr> buffers;
  std::vector<vector_size_t*> rawPtrs;
};
IndexBuffers allocateIndexBuffers(const std::vector<vector_size_t>& counts);

#include "velox/experimental/stateful/Triggerable.h"
#include "velox/experimental/stateful/window/SliceAssigner.h"
#include "velox/experimental/stateful/window/WindowBuffer.h"
#include <memory>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

基础库放前面

StatefulOperator::close();
localAggerator_->close();
if (localAggerator_) {
localAggerator_->close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要reset吧

#include "velox/experimental/stateful/window/TimeWindowUtil.h"

#include <list>
#include <memory>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放前面

StatefulOperator::initialize();
localAggerator_->initialize();
if (localAggerator_) {
localAggerator_->initialize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会初始化失败吗?

const int windowStartIndex,
const int windowEndIndex)
: StatefulOperator(std::move(globalAggerator), std::move(targets)),
localAggerator_(std::move(localAggerator)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

localAggerator typo

@@ -149,7 +272,9 @@ long WindowAggregator::sliceStateMergeTarget(long sliceToMerge) {
void WindowAggregator::close() {
processWatermarkInternal(INT_MAX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processWatermarkInternal(std::numeric_limits<int64_t>::max());

const RowVectorPtr& output,
const std::string& fieldName,
const TypePtr& fieldType,
const long fieldValue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64_t fieldValue

RowVectorPtr addWindowTimestampToOutput(
const RowVectorPtr& output,
const std::string& fieldName,
const TypePtr& fieldType,
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个可以删了,只支持TimestampType类型

std::list<RowVectorPtr> allDatas;
for (const auto& data: datas) {
allDatas.push_back(data);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::vector<RowVectorPtr> allDatas(datas.begin(), datas.end());


template<typename K>
void WindowAggregator::fireWindow(K key, long timerTimestamp, long windowEnd) {
RowVectorPtr output = windowState_->value(key, windowEnd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  if (!output) {
    LOG(INFO) << "No output found for key: " << key << ", window end: " << windowEnd;
    return;
  }

windowState_->remove(key, windowEnd);
}

void WindowAggregator::onEventTime(std::shared_ptr<TimerHeapInternalTimer<uint32_t, long>> timer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要更新currentProgress_?

onTimer(timer);
}

void WindowAggregator::onProcessingTime(std::shared_ptr<TimerHeapInternalTimer<uint32_t, long>> timer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要更新currentProgress_?


template<typename K>
void WindowAggregator::clearWindow(K key, long timerTimestamp, long windowEnd) {
windowState_->remove(key, windowEnd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

windowBuffer_需要清理?

size_ = 0;
}

bool empty() override {
Copy link
Member

@zhanglistar zhanglistar Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool empty() const override

Copy link
Member

@zhanglistar zhanglistar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check.


namespace facebook::velox::stateful {

static int roundUpToPowerOfTwo(int32_t x) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参照ck的实现

template <typename T>                                                                                                   
requires std::is_integral_v<T> && (sizeof(T) == sizeof(UInt64))                                                         
inline T roundDownToPowerOfTwo(T x)                                                                                     
{                                                                                                                       
    return x <= 0 ? 0 : (T(1) << (63 - __builtin_clzll(x)));                                                            
}                                                                                                                       
   

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种常用的转换,velox内部应该已有实现了吧

int numPartitions = 1024);

std::map<uint32_t, RowVectorPtr> partition(const RowVectorPtr& input);
std::map<int64_t, RowVectorPtr> partition(const RowVectorPtr& input);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key使用一个类型别名吧,估计后面需要换类型,64位hash值还是有可能会出现冲突。map换 unordered_map

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants